package voldemort.store.cachestore.impl;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import voldemort.store.cachestore.CacheBlock;
import voldemort.store.cachestore.Key;

import java.nio.channels.FileChannel;
import java.util.Iterator;

import static voldemort.store.cachestore.BlockUtil.*;

/**
 * Created by IntelliJ IDEA.
 * User: thsieh
 * Date: Oct 7, 2010
 * Time: 4:11:30 PM
 * To change this template use File | Settings | File Templates.
 */

/**
 * pack thread, will move data of list.get(index) to get(curIndex) in the backgroud thread
 * three tigger conditions could be either reach max file size, current overflow reach it's limit or
 * user could issue pack command
 */

public class PackThread implements Runnable {
    private static Log logger = LogFactory.getLog(PackThread.class);

    private CacheStore store;
    private int index;

    public PackThread(CacheStore store, int index) {
        this.store = store;
        this.index = index;
    }
    public void run() {
        logger.info("Start packing data for channel index " + index );
        try {
            int k = store.createChannel( index);
            store.setCurIndex(k);
            FileChannel oldChannel = store.getList().get(index).getDataChannel();
            // set curIndex after create new channel for data
            logger.info( index + " index channel status "+ store.getList().get(index).getDataChannel().isOpen() );
            logger.info( k + " curindex channel status "+ store.getList().get(index).getDataChannel().isOpen() );
            Iterator<Key> it = store.getMap().keySet().iterator();
            int error = 0 ;
            int record = 0;
            while ( it.hasNext() ) {
                try {
                    Key key = it.next() ;
                    long keyOffset2Len = 0;
                    ChannelStore channel =  store.getList().get( store.getCurIndex());
                    CacheBlock block = store.getMap().get( key);
                    if ( block == null ) logger.info("key "+key.toString() +" is not longer in map");
                    else if ( block.getIndex() != index ) {
                        logger.info("skip, block.getIndex "+block.getIndex() + " not equal to " + index);
                    }
                    else {
                        byte[] keyBytes = toKeyBytes(key);
                        synchronized( block) {
                            // update the index
                            if ( block.getData() == null ) {
                                // read from oldChannel dataChannel
                                //ChannelStore oldChannel = store.getChannel( block);
                                byte[] data =  store.getChannel( block).readChannel( block.getDataOffset2Len(), oldChannel);
                                block.setData( data);
                            }
                            // lock for writing data, same as put
                            store.getLock().lock();
                            try {
                                //update existing block info with new channel info
                                block.setDataOffset(channel.getDataOffset());
                                checkFileSize(channel.getDataOffset(), block.getDataLen());
                                //update record
                                block.setRecordNo( channel.getTotalRecord() );
                                // version and block size does not changed, no need to update
                                keyOffset2Len = convertOffset4Len( channel.getKeyOffset(), keyBytes.length);
                                // update channel info
                                channel.setTotalRecord(channel.getTotalRecord()+1);
                                channel.setDataOffset( channel.getDataOffset() + block.getDataLen());
                                channel.setKeyOffset( channel.getKeyOffset() + keyBytes.length );
                                // update the index
                                block.setIndex( store.getCurIndex());
                            } finally {
                                store.getLock().unlock();
                            }
                        }
                        // write new data
                        channel.writeNewBlock(block, keyOffset2Len, keyBytes);
                    }
                    record ++;
                } catch (Exception ex) {
                    error ++ ;
                    logger.error( ex.getMessage(), ex);
                }
            }
            if ( error == 0 ) {
                logger.info("complete packing data for channel index " + index +" total record "+record);
                // close channel
                oldChannel.close();
                store.truncate(index);
            } else  logger.error("pack data total " + error);
        } catch ( Throwable th) {
            //swallow the error
            logger.error( th.getMessage(), th);
        }
    }


}
